%%%-------------------------------------------------------------------
%%% @author wukai
%%% @copyright (C) 2017, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 15. 十一月 2017 14:42
%%%-------------------------------------------------------------------
-module(mqtt_process).
-behavior(ranch_protocol).

-include_lib("ssl/src/ssl_api.hrl").
-author("wukai").
-include("../include/mqtt_protocol.hrl").

-export([start_link/4, init/4, exit_self/0, send_msg/2, loop/2]).
-export([send_binary/2]).
start_link(Ref, Socket, Transport, Opts) ->
  Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
  lager:log(info, self(), "new connection socket:~p, pid:~p\n", [Socket, Pid]),
  {ok, Pid}.

init(Ref, Socket, Transport, _Opts = [])when is_record(Socket,sslsocket) ->
  ranch:accept_ack(Ref),
  ranch_ssl:setopts(Socket, [{active, true}, {nodelay, true}]),
  loop(Ref, #state{socket = Socket, trans = Transport, packetids = [],wait_seg =wait_head, link_type = tcp});

init(Ref, Socket, Transport, _Opts = []) ->
  ranch:accept_ack(Ref),
  inet:setopts(Socket, [{active, true}, {nodelay, true}, {linger, {false, 0}}]),
  loop(Ref, #state{socket = Socket, trans = Transport, packetids = [], wait_seg = wait_head,link_type = tcp}).

loop(Ref, State) ->
  receive
    {tcp, _, Bin} ->
      NewState = mqtt_protocol:rec_bin(Ref, State, Bin),
      loop(Ref, NewState);
    {tcp_closed, _} ->
      closed(State);
    {tcp_error, _, Reason} ->
      lager:log(info, self(), "tcp error reason:", [Reason]),
      closed(State);
    {publish, Msg} ->
      loop(Ref, pub(Ref, State, Msg));
    %%链接重连
    {reconn, NewPid} ->
      reconnection(NewPid);
    {ssl,_,Bin}->
      NewState = mqtt_protocol:rec_bin(Ref, State, Bin),
      loop(Ref, NewState);
    {ssl_closed,_,_}->
      closed(State),
      exit_self();
    M ->
      log:log("other msg ~p", [M]),
      loop(Ref, State)
  after State#state.keep_alive ->
    timeout(State)
  end.

exit_self() -> exit(self(), kill).

send_msg(Pid, M) -> Pid ! {publish, M}.

timeout(State) when State#state.client /=undefined->
  lager:log(info, self(), "received time out"),
  session_server:send_offline(State#state.client, State#state.clean_session),
  exit(self(), kill);

timeout(_)-> exit(self(), kill).

closed(State) when State#state.clean_session == 1 ->
  session_server:send_offline(State#state.client, State#state.clean_session),
  exit(self(), kill);
closed(State) when State#state.clean_session == 0 ->
  unarrived(State#state.client),
  session_server:send_offline(State#state.client, State#state.clean_session),
  exit(self(), kill).

pub(_, State, P) ->
  {Binary, State2} = case publish_frame:qos(P) of
                       A when A > 0 ->
                         {PacketId, NState} = packet_id(State),
                         cache_msg(PacketId, P),
                         {publish_frame:enc_bin(P, PacketId), NState};
                       B when B == 0 ->
                         {publish_frame:enc_bin(P, 0), State}
                     end,
  send_binary(State2, Binary), State2.

packet_id(State) when length(State#state.packetids) >= 1 ->
  [H | Tail] = State#state.packetids,
  {H, State#state{packetids = Tail}};
packet_id(State) ->
  NextId = State#state.next_msgid + 1,
  {NextId, State#state{next_msgid = NextId}}.

cache_msg(PacketId, P) -> put(PacketId, P).

reconnection(NewPid)->
  log:log("reconn newPid ~p",NewPid),
%%  case get() of
%%    L when length(L) > 0 ->
%%
%%      cache_server:send_save(Client, [Msg || {PacketId, Msg} <- get()]);
%%    _->ok
%%  end,
  exit_self().

unarrived(Client) ->
  case get() of
    L when length(L) > 0 ->
      erase(),
      cache_server:send_save(Client, [Msg || {_, Msg} <- L]);
    _->ok
  end.

send_binary(State, Bin) when State#state.link_type == tcp ->
  T = State#state.trans,
  S = State#state.socket,
  T:send(S, Bin);
send_binary(State, Bin) when State#state.link_type == websocket -> self() ! {send_bin, Bin}.






